package com.zhang.third.utils;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @title:
 * @author: zhang
 * @date: 2022/4/2 12:16
 */
public class ClickEventSource implements SourceFunction<Event> {
    private boolean running = true;
    private Random random = new Random();
    private String[] userArray = {"Mary", "Bob", "Alice", "John", "Liz"};
    private String[] urlArray = {"./home", "./cart", "./buy", "./prod?id=1"};

    // flink run jar包触发run方法的执行
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        while (true) {
            ctx.collect(new Event(
                    userArray[random.nextInt(userArray.length)],
                    urlArray[random.nextInt(urlArray.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            Thread.sleep(100L);
        }
    }

    // flink cancel jobID触发cancel方法执行
    @Override
    public void cancel() {
        running = false;
    }
}
